package com.wu.rocketmq_demo.listener;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author benjamin_5
 * @Description push监听器
 * @date 2024/6/17
 */
//@Component
public class Consumer1PushListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            String topic = msg.getTopic();
            try {
                String messageBody = new String(msg.getBody(), "utf-8");
                System.out.println(topic+":"+messageBody);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    @PostConstruct
    public void init(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册监听器
        consumer.registerMessageListener(this);
        try{
            // 设置topic
            consumer.subscribe("topic_test", "*");
            // 启动示例
            consumer.start();
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("rocketmq 消费者启动失败");
        }
    }
}
